package com.flink.java.demo.aggreagte;

import com.flink.java.demo.bean.WaterSensor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * 对于Flink而言，DataStream是没有直接进行聚合的API的。因为我们对海量数据做聚合肯定要进行分区并行处理，这样才能提高效率。
 * 所以在Flink中，要做聚合，需要先进行分区；
 * 这个操作就是通过keyBy来完成的。
 *
 * 准确来说，聚合运算是keyBy之后同一个分组内的数据才聚合。比如： A、B两个key 分别进入两个分组，但是可能经过hash取模之后计算的分区是一样的。
 *  在聚合运算的时候A和B虽然处在一个分区内，但是数据是分开计算的。
 *
 *
 * keyBy是聚合前必须要用到的一个算子。
 * keyBy通过指定键（key），可以将一条流从逻辑上划分成不同的分区（partitions）。
 * 这里所说的分区，其实就是并行处理的子任务。
 * 基于不同的key，流中的数据将被分配到不同的分区中去；这样一来， 【所有具有相同的key的数据，都将被发往同一个分区】
 *
 */
public class KeybyDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);


        DataStreamSource<WaterSensor> sensorDS = env.fromElements(
                new WaterSensor("s1", 1L, 1),
                new WaterSensor("s1", 11L, 11),
                new WaterSensor("s2", 2L, 2),
                new WaterSensor("s3", 3L, 3)
        );

        // 按照 id 分组
        /**
         *  keyby： 按照id分组
         * 要点：
         *  1、返回的是 一个 KeyedStream，键控流
         *  2、keyby不是 转换算子， 只是对数据进行重分区, 不能设置并行度
         *  3、分组 与 分区 的关系：
         *    1） keyby是对数据分组，保证 相同key的数据 在同一个分区（子任务）
         *    2） 分区： 一个子任务可以理解为一个分区，一个分区（子任务）中可以存在多个分组（key）
         */
        KeyedStream<WaterSensor, String> sensorKS = sensorDS
                .keyBy(new KeySelector<WaterSensor, String>() {
                    @Override
                    public String getKey(WaterSensor value) throws Exception {
                        return value.getId();
                    }
                });

        sensorKS.print();


        env.execute();
    }


}
